Skip to content

Conversation

alpreu
Copy link
Contributor

@alpreu alpreu commented Sep 6, 2022

This PR adds the ability to add DeadLetterPolicy beans to the PulsarListener annotation (#79).
Mostly based on Soby's work done in #85

@sobychacko
Copy link
Collaborator

@alpreu, this is cool. I will start reviewing this PR soon.

Copy link
Collaborator

@onobc onobc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alpreu thanks for another awesome contribution! Looks great. I have a few minor comments.

@onobc
Copy link
Collaborator

onobc commented Sep 6, 2022

Thanks for the awesome PR @alpreu .

I have fixed conflict locally but when I run all tests I get a failure in PulsarListenerTests (of course works fine when run by itself).

PulsarListenerTests > DeadLetterPolicyTest STANDARD_OUT
    12:29:31.220 [Test worker] ERROR org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer - Pulsar client exceptions.
    org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"Incompatible schema: exists schema type INT32, new schema type STRING","reqId":2109641079268052675, "remote":"localhost/127.0.0.1:51249", "local":"/127.0.0.1:51436"}
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1014) ~[pulsar-client-api-2.10.1.jar:2.10.1]
        at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101) ~[pulsar-client-2.10.1.jar:2.10.1]
        at org.springframework.pulsar.core.DefaultPulsarConsumerFactory.createConsumer(DefaultPulsarConsumerFactory.java:104) ~[main/:?]
        at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.<init>(DefaultPulsarMessageListenerContainer.java:191) ~[main/:?]
        at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer.doStart(DefaultPulsarMessageListenerContainer.java:95) ~[main/:?]
        at org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer.start(AbstractPulsarMessageListenerContainer.java:169) ~[main/:?]
        at org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer.doStart(ConcurrentPulsarMessageListenerContainer.java:83) ~[main/:?]
        at org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer.start(AbstractPulsarMessageListenerContainer.java:169) ~[main/:?]
        at org.springframework.pulsar.config.PulsarListenerEndpointRegistry.startIfNecessary(PulsarListenerEndpointRegistry.java:239) ~[main/:?]
        at org.springframework.pulsar.config.PulsarListenerEndpointRegistry.start(PulsarListenerEndpointRegistry.java:191) ~[main/:?]
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]

I am going to merge as @sobychacko is going to build on top of this work. @sobychacko can you follow up w/ the test failure?

@onobc
Copy link
Collaborator

onobc commented Sep 6, 2022

Closed via 25dcfab

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants